package cn.benma666.sjsj.job;

import cn.benma666.constants.UtilConst;
import cn.benma666.constants.UtilConstInstance;
import cn.benma666.dict.Cllx;
import cn.benma666.dict.XtZtm;
import cn.benma666.dict.Xzms;
import cn.benma666.dict.Yxzt;
import cn.benma666.domain.SysQxYhxx;
import cn.benma666.exception.MyException;
import cn.benma666.iframe.Conf;
import cn.benma666.iframe.MyParams;
import cn.benma666.iframe.PageInfo;
import cn.benma666.iframe.Result;
import cn.benma666.myutils.DateUtil;
import cn.benma666.json.JsonUtil;
import cn.benma666.myutils.StringUtil;
import cn.benma666.params.Znjh;
import cn.benma666.iframe.HdInterface;
import cn.benma666.sjsj.web.LjqManager;
import cn.benma666.sjsj.web.UserManager;
import cn.benma666.sjzt.SjsjField;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component;

import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 数据同步
 */
@Component
@XxlJob(value = "SjtbHandler",init = "init")
public class SjtbJob extends BasicJob implements UtilConstInstance {
    /**
     * 任务体
     */
    public void execute() throws Exception {
        info("开始数据同步");
        //输入相关参数
        MyParams srcs = MyParams.parseObject(getConfig().getString("jcsr"));
        Znjh znjh = srcs.znjh();
        if(!isBlank(getZlsjc())){
            //任务中有增量则设置，没有则采用默认
            znjh.setStart(getZlsjc());
        }
        String start = znjh.getStart();
        if(!isBlank(start)&&start.length()==14){
            //开始值长度为14时才设置当前时间为结束
            znjh.setEnd(DateUtil.getGabDate());
        }
        PageInfo page = srcs.getObject(KEY_PAGE, PageInfo.class);
        // 增量字段
        String zlzd = srcs.sjdx().getZlzd();
        if(!isBlank(zlzd)){
            //设置按增量字段排序
            page.setOrderBy(zlzd);
            srcs.yobj().set(srcs.sjdx().getZddmZlzd(), Arrays.asList(start,znjh.getEnd()));
        }
        //数据传输不进行总量统计
        page.setTotalRequired(false);
        if(!isBlank(srcs.znjh().getSql())){
            srcs.sjdx().setDxdm("SYS_COMMON_XNDX");
            srcs.sys().setCllx(Cllx.select.name());
        }
        String sruser = StringUtil.fastNotNull(srcs.znjh().getUser(),Conf.getUtilConfig().getZnjh().getUser(), UtilConst.SYS);
        //输出相关参数
        MyParams sccs = new MyParams(getConfig().getJSONObject("jcsc"));
        //读取量
        AtomicInteger dql = new AtomicInteger();
        AtomicInteger cfl = new AtomicInteger();
        //主键字段
        String zjzd = valByDef(srcs.sjdx().getZddmZjzd(), SjsjField.id.name());
        //最大保存ids的数量
        int maxIdsSize = valByDef(getConfig().getInteger("$.sjtb.maxIdsSize"),10000);
        //设置新的主键集合
        List<String> newIds = new ArrayList<>();
        AtomicReference<String> zlsjc = new AtomicReference<>();
        @NotNull SysQxYhxx user = UserManager.findUser(sruser);
        HdInterface hdff = (list, hdjs) -> {
            if(isBlank(list)){
                return success("数据列表为空，没有需要处理的数据");
            }
            debug("回调样例数据：{}",list.get(0));
            dql.addAndGet(list.size());
            if(!isBlank(zjzd)&&!isBlank(zlzd)){
                //新时间戳
                String xsjc = list.get(list.size() - 1).getString(zlzd);
                if(isBlank(xsjc)){
                    throw new MyException("从最后一行读取的最新时间戳为空："+list.get(list.size() - 1));
                }
                if(!(xsjc.equals(zlsjc.get()))){
                    //时间戳变化了
                    zlsjc.set(list.get(list.size() - 1).getString(zlzd));
                    newIds.clear();
                }
                for (int i=list.size()-1;i>=0;i--) {
                    JSONObject jsonObject = list.get(i);
                    if (jsonObject.getString(zlzd).equals(zlsjc.get())) {
                        if(newIds.size()>=maxIdsSize){
                            error("ids达到最大限制："+maxIdsSize);
                            break;
                        }
                        newIds.add(jsonObject.getString(zjzd));
                    }else {
                        break;
                    }
                }
                //移除与上次重复的数据
                if (!isBlank(getIds()) && !isBlank(start)) {
                    JSONArray ids = JSON.parseArray(getIds());
                    Iterator<JSONObject> iter = list.iterator();
                    while (iter.hasNext()) {
                        JSONObject n = iter.next();
                        if(start.equals(n.getString(zlzd))){
                            if (ids.contains(n.getString(zjzd))) {
                                //时间一样、id一样则算重复
                                iter.remove();
                                cfl.getAndIncrement();
                            }
                        }else {
                            //数据按增量字段递增排序
                            break;
                        }
                    }
                }
            }
            if(isBlank(list)){
                return success("数据列表去重后为空，没有需要处理的数据");
            }
            MyParams scJcxx = LjqManager.jcxxByParams(JsonUtil.clone(sccs), user);
            //设置下载模式，处理文件传输场景
            scJcxx.sys().setXzms(Xzms.BASE64OBJ.getCode());
            scJcxx.sys().setEditTableData(list);
            scJcxx.other().getYsParams().sys().setEditTableData(list);
            Result result = LjqManager.data(scJcxx);
            if (result.getCode() == HttpURLConnection.HTTP_OK) {
                //正常状态
                info("查询回调完成："+result.getMsg());
            }else if(result.getCode() == XtZtm.CTDYCS.getCode()){
                //穿透调用超时
                info("穿透调用超时按正常处理，不再重复推送："+result.getMsg());
            }else {
                throw result.newMyException();
            }
            return result;
        };
        srcs.sys().setHdcst(hdff);
        //内部调用
        MyParams jcxx = LjqManager.jcxxByParams(srcs,UserManager.findUser(sruser));
        Result result = LjqManager.data(jcxx);
        if(!result.isStatus()){
            throw new MyException("数据同步异常："+result);
        }
        List list = result.getData(PageInfo.class).getList();
        if(!isBlank(list)){
            info("调用结果存在数据进行处理，兼容重新select后没有实现回调支持的场景");
            hdff.run(list,true);
        }
        setDql(dql.get());
        putLzrz(dql.get()- cfl.get(),cfl.get(),0);
        if (dql.get()- cfl.get() == 0) {
            setJg(Yxzt.SUCCESS.getCode());
            info("没查询到数据");
        }else if(!isBlank(zlzd)){
            //设置新的增量时间戳
            setZlsjc(zlsjc.get());
            setIds(newIds);
        }
        info("结束数据同步");
    }
}
